DiFlow

Toni Verbeiren

TOC

  1. Introduction

  2. DiFlow

  3. Step by step

  4. Putting it all together

  5. Extra

Introduction

Asynchronous processing

Typical script-based or Makefile-based approach

run_step1 [--parallel]
run_step2 [--parallel]
...

DataFlow Programming Model

In computer programming, dataflow programming is a programming paradigm that models a program as a directed graph of the data flowing between operations…

But …

I invite you to take a look at

https://github.com/nf-core/rnaseq/blob/master/rnaseq.nf

And this already using DSL2 !

We can do better

We want the flow to be expressed explicitly and concisely:

    input_ \
        | ... \
        | ( parse_header & parse_map ) \
        | join \
        | ... \
        | plot_map \
        | convert_plot \
        | rename \
        | toSortedList{ a,b -> a[0] <=> b[0] }  \
        | ... \
        | combine_plots

Functional Reactive Programming

Think about apply in R, map in Python, Javascript, etc.

map

is a function that transforms one

Set, List, …

into another

Set, List, …

This can also be events in time:

A stream of events

… or a stream of data files

Parallel operations on a stream

Gather the results into a new stream

Join two streams

We take ideas from FRP

DiFlow

D[SL2] I[mprovement] Flow

is

  • An abstraction layer for NextFlow DSL2

  • A set of conventions

  • Tooling

General idea

A pipeline is a combination of modules:

  • A module contains one step in a larger process
  • Each module is independant
  • A module can be tested
  • A module runs in a dedicated and versioned container
  • A module takes a triplet as argument:

[ ID, data, config ]

Step by step

Step 1 - Operate on a stream

// Step - 1
workflow step1 {
  Channel.from(1) \
    | map{ it + 1 } \
    | view{ it }
}

Step 2 - Operate on a stream in parallel

// Step - 2
workflow step2 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ it + 1 } \
    | view{ it }
}

Step 3 - Operate on a stream using a process

// Step - 3
process add {
  input:
    val(input)
  output:
    val(output)
  exec:
    output = input + 1
}
workflow step3 {
  Channel.from( [ 1, 2, 3 ] ) \
    | add \
    | view{ it }
}

Step 4 - How map is synchronous

// Step - 4
def waitAndReturn(it) { sleep(2000); return it }
workflow step4 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ (it == 2) ? waitAndReturn(it) : it } \
    | map{ it + 1 } \
    | view{ it }
}

Step 5 - Introduce an ID

// Step - 5
process addTuple {
  input:
    tuple val(id), val(input)
  output:
    tuple val("${id}"), val(output)
  exec:
    output = input + 1
}
workflow step5 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el -> [ el.toString(), el ]} \
    | addTuple \
    | view{ it }
}

Step 6 - Add a process parameter

// Step - 6
process addTupleWithParameter {
  input:
    tuple val(id), val(input), val(term)
  output:
    tuple val("${id}"), val(output)
  exec:
    output = input + term
}
workflow step6 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el -> [ el.toString(), el, 10 ]} \
    | addTupleWithParameter \
    | view{ it }
}

Step 7 - Use a Map to store parameters

// Step - 7
process addTupleWithMap {
  input:
    tuple val(id), val(input), val(config)
  output:
    tuple val("${id}"), val(output)
  exec:
    output = (config.operator == "+")
                ? input + config.term
                : input - config.term
}
workflow step7 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el ->
      [
        el.toString(),
        el,
        [ "operator" : "-", "term" : 10 ]
      ] } \
    | addTupleWithMap \
    | view{ it }
}

Step 8 - Use a Map with a process-key

// Step - 8
process addTupleWithProcessHash {
  input:
    tuple val(id), val(input), val(config)
  output:
    tuple val("${id}"), val(output)
  exec:
    def thisConf = config.addTupleWithProcessHash
    output = (thisConf.operator == "+")
                ? input + thisConf.term
                : input - thisConf.term
}
workflow step8 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el ->
      [
        el.toString(),
        el,
        [ "addTupleWithProcessHash" :
          [
            "operator" : "-",
            "term" : 10
          ]
        ]
      ] } \
    | addTupleWithProcessHash \
    | view{ it }
}

Step 9 - Use a ConfigMap with a shell script

// Step - 9
process addTupleWithProcessHashScript {
  input:
    tuple val(id), val(input), val(config)
  output:
    tuple val("${id}"), stdout
  script:
    def thisConf = config.addTupleWithProcessHashScript
    def operator = thisConf.operator
    def term = thisConf.term
    """
    echo \$( expr $input $operator ${thisConf.term} )
    """
}
workflow step9 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el ->
      [
        el.toString(),
        el,
        [ "addTupleWithProcessHashScript" :
          [
            "operator" : "-",
            "term" : 10
          ]
        ]
      ] } \
    | addTupleWithProcessHashScript \
    | view{ it }
}

Step 10 - Running a pipeline

// Step - 10
process process_step10a {
  input:
    tuple val(id), val(input), val(term)
  output:
    tuple val("${id}"), val(output), val("${term}")
  exec:
    output = input.toInteger() + term.toInteger()
}
process process_step10b {
  input:
    tuple val(id), val(input), val(term)
  output:
    tuple val("${id}"), val(output), val("${term}")
  exec:
    output = input.toInteger() - term.toInteger()
}
workflow step10 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el -> [ el.toString(), el, 10 ] } \
    | process_step10a \
    | process_step10b \
    | view{ it }
}

// Step - 10a
workflow step10a {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el -> [ el.toString(), el, 10 ] } \
    | process_step10a \
    | map{ id, value, term -> [ id, value, 5 ] } \
    | map{ [ it[0], it[1], 5 ] } \
    | map{ x -> [ x[0], x[1], 5 ] } \
    | process_step10b \
    | view{ it }
}

  ...
  | map{ x -> [ x[0], x[1], 5 ] } \
  ...
  ...
  | map{ id, value, term -> [ id, value, 5 ] } \
  ...

Step 11 - A more generic process

// Step - 11
process process_step11 {
    input:
        tuple val(id), val(input), val(config)
    output:
        tuple val("${id}"), val(output), val("${config}")
    exec:
        if (config.operator == "+")
           output = input.toInteger() + config.term.toInteger()
        else
           output = input.toInteger() - config.term.toInteger()
}
workflow step11 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el -> [ el.toString(), el, [ : ] ] } \
    | process_step11 \
    | map{ id, value, config ->
      [
        id,
        value,
        [ "term" : 11, "operator" : "-" ]
      ] } \
    | process_step11 \
    | view{ [ it[0], it[1] ] }
}

// Step - 11a
include { process_step11 as process_step11a } \
  from './examples/modules/step11.nf'
include { process_step11 as process_step11b } \
  from './examples/modules/step11.nf'
workflow step11a {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el -> [ el.toString(), el, [ : ] ] } \
    | map{ id, value, config ->
      [
        id,
        value,
        [ "term" : 5, "operator" : "+" ]
      ] } \
    | process_step11a \
    | map{ id, value, config ->
      [
        id,
        value,
        [ "term" : 11, "operator" : "-" ]
      ] } \
    | process_step11b \
    | view{ [ it[0], it[1] ] }
}

Step 12 - Map/reduce in NextFlow

// Step - 12
process process_step12 {
  input:
    tuple val(id), val(input), val(term)
  output:
    tuple val("${id}"), val(output), val("${term}")
  exec:
    output = input.sum()
}
workflow step12 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el -> [ el.toString(), el, 10 ] } \
    | process_step10a \
    | toList \
    | map{
      [
        "sum",
        it.collect{ id, value, config -> value },
        [ : ]
      ] } \
    | process_step12 \
    | view{ [ it[0], it[1] ] }
}

Step 13 - Files as input/output

// Step - 13
process process_step13 {
  input:
    tuple val(id), file(input), val(config)
  output:
    tuple val("${id}"), file("output.txt"), val("${config}")
  script:
    """
    a=`cat $input`
    let result="\$a + ${config.term}"
    echo "\$result" > output.txt
    """
}
workflow step13 {
  Channel.fromPath( params.input ) \
    | map{ el ->
      [
        el.baseName.toString(),
        el,
        [ "operator" : "-", "term" : 10 ]
      ]} \
    | process_step13 \
    | view{ [ it[0], it[1] ] }
}

Step 14 - Publishing output

// Step - 14
process process_step14 {
    publishDir "output/"
    input:
        tuple val(id), file(input), val(config)
    output:
        tuple val("${id}"), file("output.txt"), val("${config}")
    script:
        """
        a=`cat $input`
        let result="\$a + ${config.term}"
        echo "\$result" > output.txt
        """
}
workflow step14 {
    Channel.fromPath( params.input ) \
        | map{ el ->
          [
            el.baseName.toString(),
            el,
            [ "operator" : "-", "term" : 10 ]
          ]} \
        | process_step14 \
        | view{ [ it[0], it[1] ] }
}

Step 15 - Make output files/paths unique

// Step - 15
process process_step15 {
    publishDir "output/${config.id}"
    input:
        tuple val(id), file(input), val(config)
    output:
        tuple val("${id}"), file("output.txt"), val("${config}")
    script:
        """
        a=`cat $input`
        let result="\$a + ${config.term}"
        echo "\$result" > output.txt
        """
}
workflow step15 {
    Channel.fromPath( params.input ) \
        | map{ el ->
            [
              el.baseName,
              el,
              [
                "id": el.baseName,
                "operator" : "-",
                "term" : 10
              ]
            ] } \
        | process_step15 \
        | view{ [ it[0], it[1] ] }
}

Step 16 - Where to put params?

Step 17 - Add the output file to params

// Step - 17
process process_step17 {
    publishDir "output"
    input:
        tuple val(id), file(input), val(config)
    output:
        tuple val("${id}"), file(params.output), val("${config}")
    script:
        """
        a=`cat $input`
        let result="\$a + ${config.term}"
        echo "\$result" > ${params.output}
        """
}
workflow step17 {
    Channel.fromPath( params.input ) \
        | map{ el ->
          [
            el.baseName.toString(),
            el,
            [
              "id": el.baseName,
              "operator" : "-",
              "term" : 10
            ]
          ] } \
        | process_step17 \
        | view{ [ it[0], it[1] ] }
}

Step 18 - Add the output filename to the triplet

// Step - 18
process process_step18 {
    publishDir "output"
    input:
        tuple val(id), file(input), val(config)
    output:
        tuple val("${id}"), file("${config.output}"), val("${config}")
    script:
        """
        a=`cat $input`
        let result="\$a + ${config.term}"
        echo "\$result" > ${config.output}
        """
}
workflow step18 {
    Channel.fromPath( params.input ) \
        | map{ el -> [
            el.baseName.toString(),
            el,
            [
                "output" : "output_from_${el.baseName}.txt",
                "id": el.baseName,
                "operator" : "-",
                "term" : 10
            ]
          ]} \
        | process_step18 \
        | view{ [ it[0], it[1] ] }
}

Step 19 - Use a closure

// Step - 19
def out_from_in = { it -> it.baseName + "-out.txt" }
process process_step19 {
    publishDir "output"
    input:
        tuple val(id), file(input), val(config)
    output:
        tuple val("${id}"), file("${out}"), val("${config}")
    script:
        out = out_from_in(input)
        """
        a=`cat $input`
        let result="\$a + ${config.term}"
        echo "\$result" > ${out}
        """
}
workflow step19 {
    Channel.fromPath( params.input ) \
        | map{ el -> [
            el.baseName.toString(),
            el,
            [
                "id": el.baseName,
                "operator" : "-",
                "term" : 10
            ]
          ]} \
        | process_step19 \
        | view{ [ it[0], it[1] ] }
}

Step 20 - The order of events in a stream

// Step - 20a
process process_step20 {
    input:
        tuple val(id), val(input), val(term)
    output:
        tuple val("${id}"), val(output), val("${term}")
    exec:
        output = input[0] / input[1]
}
workflow step20a {
    Channel.from( [ 1, 2 ] ) \
        | map{ el -> [ el.toString(), el, 10 ] } \
        | process_step10a \
        | toList \
        | map{ [
                  "sum",
                  it.collect{ id, value, config -> value },
                  [ : ]
               ] } \
        | process_step20 \
        | view{ [ it[0], it[1] ] }
}

// Step - 20b
workflow step20b {
    Channel.from( [ 1, 2 ] ) \
        | map{ el -> [ el.toString(), el, 10 ] } \
        | process_step10a \
        | toSortedList{ a,b -> a[0] <=> b[0] } \
        | map{ [ "sum", it.collect{ id, value, config -> value }, [ : ] ] } \
        | process_step20 \
        | view{ [ it[0], it[1] ] }
}

Step 21 - Is the triplet really necessary?

// Step - 21
process process_step21 {
    input:
        val(in1)
        val(in2)
    output:
        val(out)
    exec:
        out = in1 + in2
}
workflow step21 {
    ch1_ = Channel.from( [1, 2, 3, 4, 5 ] )
    ch2_ = Channel.from( ["a", "b", "c", "d" ] )
    process_step21(ch1_, ch2_) | toSortedList | view
}

// Step - 21a
workflow step21a {
    ch1_ = Channel.from( [1, 2, 3, 4, 5 ] ) | add
    ch2_ = Channel.from( ["a", "b", "c", "d" ] )
    process_step21(ch1_, ch2_) | toSortedList | view
}

Step 22 - Toward generic processes

// Step - 22
process process_step22 {
    publishDir "output"
    input:
        tuple val(id), file(input), val(config)
    output:
        tuple val("${id}"), file("${config.output}"), val("${config}")
    script:
        """
        ${config.cli}
        """
}
workflow step22 {
    Channel.fromPath( params.input ) \
        | map{ el -> [
            el.baseName.toString(),
            el,
            [
                "cli": "cat input.txt > output22.txt",
                "output": "output22.txt"
            ]
          ]} \
        | process_step22 \
        | view{ [ it[0], it[1] ] }
}
//- - -

params {
  ...
  cellranger {
    name = "cellranger"
    container = "mapping/cellranger:4.0.0-1"
    command = "cellranger"
    arguments {
      mode {
        name = "mode"
        otype = "--"
        description = "The run mode"
        value = "count"
        ...
      }
      input {
        name = "input"
        otype = "--"
        description = "Path of folder created by mkfastq or bcl2fastq"
        required = false
        ...
      }
      id {
        name = "id"
        otype = "--"
        description = "Output folder for the count files."
        value = "cr"
        required = false
        ...
      }
      ...
    }
    ...
  }

Step 23 - More than one input

[ "input": "<fastq-dir>", "reference": "<reference-dir>" ]

Step 24 - workflow instead of process

process cellranger_process {
  ...
  container "${params.dockerPrefix}${container}"
  publishDir "${params.output}/processed_data/${id}/", mode: 'copy', overwrite: true
  input:
    tuple val(id), path(input), val(output), val(container), val(cli)
  output:
    tuple val("${id}"), path("${output}")
  script:
    """
    export PATH="${moduleDir}:\$PATH"
    $cli
    """
}
workflow cellranger {
  take:
    id_input_params_
  main:
    ...
    result_ =  ...
  emit:
    result_
}

Putting it all together

A module combine_plots

combine_plots/
  main.nf
  nextflow.config
  [script(s)]

nextflow.config:

params {
  combine_plots__input = "/path/to/my/dir"
  combine_plots__output = "output.webm"
  combine_plots__framerate = "4"
  id = ""
  dockerPrefix = ""
  input = ""
  output = ""
  combine_plots {
    name = "combine_plots"
    container = "jrottenberg/ffmpeg:latest"
    command = "combine_plots"
    # ...
    arguments {
      input {
        name = "input"
        otype = "--"
        description = "A list of images."
        value = "${params.combine_plots__input}"
        required = true
        type = "file"
        # ...
      }
      output {
        name = "output"
        otype = "--"
        description = "A path to output the movie to."
        value = "${params.combine_plots__output}"
        required = true
        type = "file"
        # ...
      }
      framerate {
        name = "framerate"
        otype = "--"
        description = "Number of frames per second."
        value = "${params.combine_plots__framerate}"
        required = false
        type = "integer"
        # ...
      }
    }
  }
}

main.nf:

nextflow.preview.dsl=2
import java.nio.file.Paths

// ... checks

def renderCLI(command, arguments) {
  // ... based on nextflow.config, render CLI
}

// files is either String, List[String] or HashMap[String,String]
def outFromIn(files) {
  // ... derive output from input filename
}

// In: Hashmap key -> DataObjects
// Out: Arrays of DataObjects
def overrideInput(params, str) {
  // ... internal consistency
}

def overrideOutput(params, str) {
  // ... internal consistency
}

process combine_plots_process {
  // ...
  container "${params.dockerPrefix}${container}"
  publishDir "${params.output}/${id}/", mode: 'copy', overwrite: true
  input:
    tuple val(id), path(input), val(output), val(container), val(cli)
  output:
    tuple val("${id}"), path("${output}")
  script:
    """
    # ...
    $cli
    """
}

workflow combine_plots {
    take:
      id_input_params_
    main:
      def key = "combine_plots"
      // ... workflow logic
    emit:
    result_
}

workflow {
  // ... Testing procedure(s)
}

Boilerplate?

We don’t write these files ourselves, they are generated!

combine_plots/config.vsh.yaml:

functionality:
  name: combine_plots
  namespace: civ6_save_renderer
  description: Combine multiple images into a movie using ffmpeg.
  arguments:
    - name: "--input"
      alternatives: [-i]
      type: file
      required: true
      default: "/path/to/my/dir"
      must_exist: true
      multiple: true
      description: A list of images.
    - name: "--output"
      alternatives: [-o]
      type: file
      required: true
      default: "output.webm"
      direction: output
      description: A path to output the movie to.
    - name: "--framerate"
      alternatives: [-f]
      type: integer
      default: 4
      description: Number of frames per second.
  resources:
    - type: bash_script
      path: script.sh
platforms:
  - type: docker
    image: jrottenberg/ffmpeg
  - type: nextflow
    image: jrottenberg/ffmpeg
    publish: true
  - type: native

combine_plots/script.sh:

#!/bin/bash

# render movie
inputs=`echo $par_input | sed 's#:# -i #g'`
ffmpeg -framerate $par_framerate -f image2 -i $inputs -c:v libvpx-vp9 -pix_fmt yuva420p -y $par_output

Viash

A Pipeline: civ6_postgame

https://github.com/data-intuitive/viash_docs/tree/master/examples/civ6_postgame

Original bash script:

#!/bin/bash

# run beforehand:
# viash ns build -P docker --setup
# viash ns build -P nextflow

BIN=target/docker/civ6_save_renderer
input_dir="data"
output_dir="output"

mkdir -p "$output_dir"

function msg {
  echo -e "\033[32m>>>>>>> $1\e[0m"
}

for save_file in $input_dir/*.Civ6Save; do
  file_basename=$(basename $save_file)
  yaml_file="$output_dir/${file_basename/Civ6Save/yaml}"
  tsv_file="$output_dir/${file_basename/Civ6Save/tsv}"
  pdf_file="$output_dir/${file_basename/Civ6Save/pdf}"
  png_file="$output_dir/${file_basename/Civ6Save/png}"

  if [ ! -f "$yaml_file" ]; then
    msg "parse header '$save_file'"
    $BIN/parse_header/parse_header -i "$save_file" -o "$yaml_file"
  fi

  if [ ! -f "$tsv_file" ]; then
    msg "parse map '$save_file'"
    $BIN/parse_map/parse_map -i "$save_file" -o "$tsv_file"
  fi

  if [ ! -f "$pdf_file" ]; then
    msg "plot map '$save_file'"
    $BIN/plot_map/plot_map -y "$yaml_file" -t "$tsv_file" -o "$pdf_file"
  fi

  if [ ! -f "$png_file" ]; then
    msg "convert plot '$save_file'"
    $BIN/convert_plot/convert_plot -i "$pdf_file" -o "$png_file"
  fi
done

png_inputs=`find "$output_dir" -name "*.png" | sed "s#.*#&:#" | tr -d '\n' | sed 's#:$#\n#'`

msg "combine plots"
$BIN/combine_plots/combine_plots -i "$png_inputs" -o "$output_dir/movie.webm" --framerate 1

nextflow.config

includeConfig 'target/nextflow/civ6_save_renderer/plot_map/nextflow.config'
includeConfig 'target/nextflow/civ6_save_renderer/combine_plots/nextflow.config'
includeConfig 'target/nextflow/civ6_save_renderer/convert_plot/nextflow.config'
includeConfig 'target/nextflow/civ6_save_renderer/parse_header/nextflow.config'
includeConfig 'target/nextflow/civ6_save_renderer/parse_map/nextflow.config'

docker {
  runOptions = "-i -v ${baseDir}:${baseDir}"
}

main.nf

nextflow.preview.dsl=2

import java.nio.file.Paths

include  plot_map       from  './target/nextflow/civ6_save_renderer/plot_map/main.nf'       params(params)
include  combine_plots  from  './target/nextflow/civ6_save_renderer/combine_plots/main.nf'  params(params)
include  convert_plot   from  './target/nextflow/civ6_save_renderer/convert_plot/main.nf'   params(params)
include  parse_header   from  './target/nextflow/civ6_save_renderer/parse_header/main.nf'   params(params)
include  parse_map      from  './target/nextflow/civ6_save_renderer/parse_map/main.nf'      params(params)
include  rename         from  './src/utils.nf'

workflow {

    if (params.debug == true)
        println(params)

    if (!params.containsKey("input") || params.input == "") {
        exit 1, "ERROR: Please provide a --input parameter pointing to .Civ6Save file(s)"
    }

    def input_ = Channel.fromPath(params.input)

    def listToTriplet = { it -> [ "all", it.collect{ a -> a[1] }, params ] }

    input_ \
        | map{ it -> [ it.baseName , it ] } \
        | map{ it -> [ it[0] , it[1], params ] } \
        | ( parse_header & parse_map ) \
        | join \
        | map{ id, parse_headerOut, params1, parse_mapOut, params2 ->
            [ id, [ "yaml" : parse_headerOut, "tsv": parse_mapOut ], params1 ] } \
        | plot_map \
        | convert_plot \
        | rename \
        | toSortedList{ a,b -> a[0] <=> b[0] }  \
        | map( listToTriplet ) \
        | combine_plots

}

Running the pipeline

viash ns build -p docker --setup
viash ns build -p nextflow
nextflow run . \
  --input "data/*.Civ6Save" \
  --output "output/" \
  --combine_plots__framerate 1

Extra

Illustration

process process_join_process {

    input:
        tuple val(id1), val(in1)
        tuple val(id2), val(in2)
    output:
        tuple val("${id1}"), val(out)
    exec:
        out = in1 + "/" + in2

}

workflow join_process {

    ch1_ = Channel.from( ["1", "2", "3", "4" ] ) | map{ [ it, it.toString() ] } \
            | addTuple
    ch2_ = Channel.fromList( [ ["1", "a"] , ["2", "b"], ["3", "c"], ["4", "d"] ] )

    process_join_process(ch1_, ch2_) \
      | toSortedList | view

}
N E X T F L O W  ~  version 20.10.0
Launching `./main.nf` [berserk_bohr] - revision: 4ff07776e0
WARN: DSL 2 IS AN EXPERIMENTAL FEATURE UNDER DEVELOPMENT -- SYNTAX MAY CHANGE IN FUTURE RELEASE
executor >  local (8)
[50/e255a0] process > join_process:addTuple (4)             [100%] 4 of 4 ✔
[c4/6cc46b] process > join_process:process_join_process (4) [100%] 4 of 4 ✔
[[1, 11/b], [2, 21/a], [3, 31/c], [4, 41/d]]

process process_join_stream {

    input:
        tuple val(id), val(inMap)
    output:
        tuple val("$id"), val(out)
    exec:
        out = inMap.left + inMap.right

}

workflow join_stream {

    ch1_ = Channel.from( ["1", "2", "3", "4" ] ) | map{ [ it, it.toString() ] } \
            | addTuple
    ch2_ = Channel.fromList( [ ["1", "a"] , ["2", "b"], ["3", "c"], ["4", "d"] ] )

    ch1_.join(ch2_) \
      | map{ id, left, right -> [ id, [ "left" : left, "right" : right ] ] } \
      | process_join_stream \
      | toSortedList \
      | view

}
N E X T F L O W  ~  version 20.10.0
Launching `./main.nf` [gigantic_bassi] - revision: 4ff07776e0
WARN: DSL 2 IS AN EXPERIMENTAL FEATURE UNDER DEVELOPMENT -- SYNTAX MAY CHANGE IN FUTURE RELEASE
executor >  local (8)
[22/57f9ae] process > join_stream:addTuple (1)            [100%] 4 of 4 ✔
[9a/956c37] process > join_stream:process_join_stream (4) [100%] 4 of 4 ✔
[[1, 11a], [2, 21b], [3, 31c], [4, 41d]]

// reveal.js plugins